Hyperf 协程解析

一、协程是什么?

你可以把协程理解为一种轻量级的线程,是用户态下的任务调度单位。它由用户代码来调度和管理,而不是由操作系统内核来进行调度。一个 Worker 进程可以拥有大量的协程(例如,默认配置下一个进程最多可创建 100,000 个协程)。

二、Hyperf 协程的核心实现

Hyperf 的协程能力主要构建在 Swoole 扩展之上。

1. 协作式调度 (Cooperative Scheduling)

Hyperf 的协程调度器采用协作式调度。这意味着:

2. 协程调度器:智慧的“大脑”

协程调度器就像是协程的“指挥中心”,它主要包含以下几个部分:

协程调度器工作流程图
flowchart TD
A[创建新协程] --> B["协程进入就绪队列"]
B --> C{调度器选择协程执行}
C --> D["协程执行计算任务
(CPU操作)"] D --> E["执行完毕?"] E -- 是 --> F[协程结束] E -- 否 --> G{"遇到I/O操作或sleep?"} G -- 否 --> D G -- 是 --> H["协程挂起 (yield)
加入阻塞队列"] H --> I["注册I/O事件回调
或启动定时器"] I --> J{"I/O完成或超时?"} J -- 是 --> K["事件回调将协程
重新放回就绪队列"] K --> C

3. 协程的创建

在 Hyperf 中,你可以使用 co() 函数或 Coroutine::create() 来创建协程:

co(function () {
// 你的协程代码
$result = someAsyncOperation();
});

三、协程间的通信与同步

因为协程是共享内存的,所以当它们需要协作或共享数据时,就需要安全的通信和同步机制。

Channel

类似于 Go 语言的 chan,为多生产者协程和多消费者协程模式提供支持。Channel 底层自动实现了协程的切换和调度。

WaitGroup

基于 Channel 实现,用于等待一组协程全部执行完毕

use Hyperf\Utils\WaitGroup; // 注意:新版本中可能在 Utils\Coroutine 下

$wg = new WaitGroup();
$wg->add(2); // 计数器加2

co(function () use ($wg) {
	// 任务A
	$wg->done(); // 计数器减1
});

co(function () use ($wg) {
	// 任务B
	$wg->done(); // 计数器减1
});

$wg->wait(); // 阻塞,直到计数器变为0

Parallel

这是对 WaitGroup 的一个更高级的封装,让你能更方便地并行执行多个任务并收集结果。你还可以限制最大并发数,防止瞬时并发过高压垮下游服务。

use Hyperf\Utils\Coroutine\Parallel; // 注意:新版本中命名空间可能有所不同

$parallel = new Parallel(5); // 最大并发数限制为5
for ($i = 0; $i < 20; $i++) {
	$parallel->add(function () use ($i) {
		return doSomeJob($i);
	});
}
$results = $parallel->wait(); // 获取所有任务的结果数组

Concurrent

用于控制一个代码块内同时运行的最大协程数量

use Hyperf\Utils\Coroutine\Concurrent;

$concurrent = new Concurrent(10);
for ($i = 0; $i < 15; ++$i) {
	$concurrent->create(function () {
		// Do something... 最多同时10个协程执行这个代码块
	});
}

四、透传协程上下文

这是 Hyperf 协程编程中一个非常重要且容易踩坑的概念

解决方案

显式复制

在创建子协程时,手动将父协程的上下文复制过去。

$cid = SwooleCo::getCid(); // 获取当前协程ID
$pid = SwooleCo::getPcid($cid); // 获取父协程ID
co(function () use ($pid) {
	Context::copy($pid); // 关键:复制父协程上下文
	// ... 现在可以获取到父协程的上下文数据了
	$requestId = Context::getTRACE_ID;
});
链式查找

通过 SwooleCo::getPcid($coId) 获取父协程 ID,然后逐级向上查找需要的上下文数据。

/**
 * 获取请求ID
 * @return string
 */
public static function getRequestId(): string
{
	// 获取当前协程ID
	$currCoId = $coId = SwooleCo::getCid();
	// 获取当前协程里没有请求ID时,会去查询父协程中的请求ID,直到顶级协程为止
	do {
		$requestId = Context::getTRACE_ID, '', $coId;
	} while (!$requestId && $coId > 0 && ($coId = SwooleCo::getPcid($coId)));
	// 父子协程共享trace_id
	if ($requestId && $currCoId != $coId) {
		Context::setTRACE_ID, $requestId;
	}
	// 创建请求ID
	if (empty($requestId)) {
		$requestId = IdGenTool::uuidV4();
		Context::setTRACE_ID, $requestId;
	}
	return $requestId;
}